package com.flow.framework.common.stream;

import com.flow.framework.common.constant.FrameworkCommonConstant;
import com.flow.framework.common.stream.handler.BatchReturnProcessHandler;
import com.flow.framework.common.util.io.IoUtil;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.extern.slf4j.Slf4j;

import java.io.IOException;
import java.io.InputStream;

/**
 * 批量解码输入流包装
 *
 * @author luoguopiao
 * @version 0.0.1
 * @date 2022/1/2
 */
@Slf4j
public class BatchProcessInputStream extends InputStream {

    private final ProcessCache processCache = new ProcessCache();

    private final InputStream srcInputStream;

    private final BatchReturnProcessHandler batchReturnProcessHandler;

    private boolean finish = false;

    public BatchProcessInputStream(InputStream srcInputStream, BatchReturnProcessHandler batchReturnProcessHandler) {
        IoUtil.checkInputStreamAndHandler(srcInputStream, batchReturnProcessHandler);
        this.batchReturnProcessHandler = batchReturnProcessHandler;
        this.srcInputStream = srcInputStream;
    }

    @Override
    public int read(byte[] buffer) throws IOException {
        return read(buffer, 0, buffer.length);
    }

    private int handleRead(byte[] targetBuffer, int bufferCurrent, int bufferLength, byte[] batchBuffer) throws IOException {
        // 当前targetBuffer的数据起始位置
        int targetCurrent = bufferCurrent;

        // 缓存的buffer
        byte[] cacheBuffer = processCache.getBuffer();

        // 已经复制的数据长度
        int copyLength = 0;

        // 如果缓存buffer不为空则开始复制数据
        if (null != cacheBuffer) {
            // 缓存buffer的数据起始位置
            int cacheCurrent = processCache.getCurrent();

            // 缓存buffer的数据有效长度
            int cacheLength = processCache.getLength();

            // 计算可复制的有效数据长度，如果targetBuffer剩余的长度小则取targetBuffer的剩余长度，如果缓存的长度小则取缓存的长度
            copyLength = Math.min(bufferLength - targetCurrent, cacheLength);

            // 如果可复制数据长度大于0，则开始复制数据到targetBuffer
            if (copyLength > 0) {
                System.arraycopy(cacheBuffer, cacheCurrent, targetBuffer, targetCurrent, copyLength);

                // 更新targetBuffer的数据起始位置
                targetCurrent += copyLength;

                // 更新缓存buffer的有效数据起始位置
                processCache.setCurrent(cacheCurrent + copyLength);

                // 更新缓存buffer的数据长度
                processCache.setLength(cacheLength - copyLength);
            }
        }

        // 如果targetBuffer的数据起始位加1小于targetBuffer的长度，则说明targetBuffer还可以继续填充数据
        // 且缓存buffer中已无有效数据，需重新加载
        if (targetCurrent + FrameworkCommonConstant.ARRAY_INDEX_LESS_SIZE_VALUE <= bufferLength) {

            // 创建原始数据buffer，即需要自定义处理的批量buffer
            byte[] srcBuffer = batchBuffer;
            if (null == srcBuffer) {
                srcBuffer = new byte[batchReturnProcessHandler.getBatchSize()];
            }

            // 读取原始流中的数据
            int readLen = srcInputStream.read(srcBuffer);

            // 如果流读取结束，则返回已经复制的数据长度，否则返回流读取结束值
            if (FrameworkCommonConstant.EOF == readLen) {
                finish = true;
                return copyLength;
            }


            // 将读取出来的数据复制后调用批量处理器
            byte[] srcTempBuffer = new byte[readLen];
            System.arraycopy(srcBuffer, 0, srcTempBuffer, 0, readLen);
            byte[] handledBuffer = batchReturnProcessHandler.process(srcTempBuffer);
            int handledBufferLength = handledBuffer.length;

            // 将处理完的buffer数据缓存到解码数据缓存对象中
            processCache.setBuffer(handledBuffer);
            processCache.setLength(handledBufferLength);
            processCache.setCurrent(0);

            // 返回当前已经复制的数据长度加上递归调用时复制的有效长度
            // 不需要考虑递归调用时返回-1的情况，因为targetBuffer数据没有满且缓存buffer中没有有效数据
            // 只可能是两种情况，1、targetBuffer中存在部分有效数据，则此时返回有效数据长度；2、targetBuffer中
            // 无有效数据，则此时一定是循环读取输入流的最后一次，即不会触发递归调用；故不需要考虑递归调用时返回-1情况

            // 递归调用，可能会存在多次，实际调用取决于原始读取出的数组数据大小和批处理数据的批量大小
            // 即：读取出的数组数据大小和批处理数据的批量大小除以他们所有公约数之后的两个数的乘积，则为递归调用的次数
            return copyLength + handleRead(targetBuffer, targetCurrent, bufferLength, srcBuffer);
        }

        // 返回已经复制的数据长度
        return copyLength;
    }

    @Override
    public int read(byte[] buffer, int off, int len) throws IOException {
        if (finish) {

            // 返回-1后将是否读取完成改为false，防止原始输入流是可重复读取流
            finish = false;
            return FrameworkCommonConstant.EOF;
        }
        return handleRead(buffer, off, len, null);
    }

    @Override
    public long skip(long n) throws IOException {
        return srcInputStream.skip(n);
    }

    @Override
    public int available() throws IOException {
        return srcInputStream.available();
    }

    @Override
    public void close() throws IOException {
        IoUtil.close(srcInputStream);
        super.close();
    }

    @Override
    public synchronized void mark(int readlimit) {
        srcInputStream.mark(readlimit);
    }

    @Override
    public synchronized void reset() throws IOException {
        srcInputStream.reset();
    }

    @Override
    public boolean markSupported() {
        return srcInputStream.markSupported();
    }

    /**
     * 该方法一般用来试探输入流中是否有数据
     *
     * @return 读取的字节
     * @throws IOException IOException
     */
    @Override
    public int read() throws IOException {
        byte[] buffer = new byte[FrameworkCommonConstant.ONE_ELEMENT_COLLECTION_SIZE];
        int len = read(buffer);
        if (FrameworkCommonConstant.EOF == len || 0 == len) {
            return FrameworkCommonConstant.EOF;
        }
        return buffer[0];
    }

    @Data
    @NoArgsConstructor
    private static class ProcessCache {
        private byte[] buffer;

        /**
         * 缓存数组中有效数据的起始位置
         */
        private int current;

        /**
         * 当前缓存数组中的有效数据长度
         */
        private int length;
    }
}